We demonstrate how you can manage your own contextual multi-armed bandit workflow on SageMaker using the built-in Vowpal Wabbit (VW) container to train and deploy contextual bandit models. We show how to train these models that interact with a live environment (using a simulated client application) and continuously update the model with efficient exploration.
Wherever we look to personalize content for a user (content layout, ads, search, product recommendations, etc.), contextual bandits come in handy. Traditional personalization methods collect a training dataset, build a model and deploy it for generating recommendations. However, the training algorithm does not inform us on how to collect this dataset, especially in a production system where generating poor recommendations lead to loss of revenue. Contextual bandit algorithms help us collect this data in a strategic manner by trading off between exploiting known information and exploring recommendations which may yield higher benefits. The collected data is used to update the personalization model in an online manner. Therefore, contextual bandits help us train a personalization model while minimizing the impact of poor recommendations.
To implement the exploration-exploitation strategy, we need an iterative training and deployment system that: (1) recommends an action using the contextual bandit model based on user context, (2) captures the implicit feedback over time and (3) continuously trains the model with incremental interaction data. In this notebook, we show how to setup the infrastructure needed for such an iterative learning system. While the example demonstrates a bandits application, these continual learning systems are useful more generally in dynamic scenarios where models need to be continually updated to capture the recent trends in the data (e.g. tracking fraud behaviors based on detection mechanisms or tracking user interests over time).
In a typical supervised learning setup, the model is trained with a SageMaker training job and it is hosted behind a SageMaker hosting endpoint. The client application calls the endpoint for inference and receives a response. In bandits, the client application also sends the reward (a score assigned to each recommendation generated by the model) back for subsequent model training. These rewards will be part of the dataset for the subsequent model training.
The contextual bandit training workflow is controlled by an experiment manager provided with this example. The client application (say a recommender system application) pings the SageMaker hosting endpoint that is serving the bandits model. The application sends the state (user features) as input and receives an action (recommendation) as a response. The client application sends the recommended action to the user and stores the received reward in S3. The SageMaker hosted endpoint also stores inference data (state and action) in S3. The experiment manager joins the inference data with rewards as they become available. The joined data is used to update the model with a SageMaker training job. The updated model is evaluated offline and deployed to SageMaker hosting endpoint if the model evaluation score improves upon prior models.
Below is an overview of the subsequent cells in the notebook:
In all notebooks, there is color convention on the cells:
To facilitate experimentation, we provide a local_mode that runs the contextual bandit example using the SageMaker Notebook instance itself instead of SageMaker training and hosting instances. The workflow remains the same in local_mode, but runs much faster for small datasets. Hence, it is a useful tool for experimentation and debugging. However, it will not scale to production use cases with high throughput and large datasets.
In local_mode, the training, evaluation and hosting is done with the SageMaker VW docker container. The join is not handled by SageMaker, and is done inside the client application. The rest of the textual explanation assumes that the notebook is run in SageMaker mode.
import yaml
import sys
import numpy as np
import time
import sagemaker
import pprint
import pandas as pd
sys.path.append('common')
sys.path.append('common/sagemaker_rl')
from misc import get_execution_role
from markdown_helper import *
from IPython.display import Markdown
from IPython.core.display import Image, display, HTML
<div class="alert alert-block alert-danger""> IMPORTANT : You need to run the notebook with Sagemaker only, so you need to configure the following value in config.yaml file
!pygmentize 'config.yaml'
config_file = 'config.yaml'
with open(config_file, 'r') as yaml_file:
config = yaml.load(yaml_file)
Let's do a deep dive on the Vowpal Wabbit (VW) configuration!
A container for Vowpal Wabbit is already provisionned by AWS and available here :
462105765813.dkr.ecr.<region>.amazonaws.com/sagemaker-rl-vw-container:vw-8.7.0-cpu.
This container is built and maintained by AWS Teams but you can access the DockerFile!
VW supports multiple Contextual Bandit algorithms:
Please make sure that the
num_armsparameter in the config is equal to the number of actions in the client application (which is defined in the cell below).
Either get the execution role when running from a SageMaker notebook role = sagemaker.get_execution_role() or, when running from local machine, use utils method role = get_execution_role('role_name') to create an execution role.
try:
sagemaker_role = sagemaker.get_execution_role()
except:
sagemaker_role = get_execution_role('sagemaker')
print("Using Sagemaker IAM role arn: \n{}".format(sagemaker_role))
It can be seen as a multi-class classification problem with 9 features and 7 classes. In the classification problem, the algorithm receives features and correct label per datapoint.
Here, we convert this multi-class classifcation problem to a bandit problem.
The algorithm picks one of the label (arm) options given the features (context). If this matches the class in the original data point, a reward of one is assigned. If not, a reward of zero is assigned. </div/>
display(Image('images/AIM404-classification-bandits.png'))
The client application simulates a live environment that uses the SageMaker bandits model to serve recommendations to users. The logic of reward generation resides in the client application.
The workflow of the client application is as follows:
event_id.event_id.event_id is a unique identifier for each interaction. It is used to join inference data <state, action, action probability> with the rewards.
In a later cell of this notebook, where there exists a hosted endpoint, we illustrate how the client application interacts with the endpoint and gets the recommended action.
The shuttle data set is stored under sim_app/shuttle.trn
They are actually 2 scripts that will help us simulate our application :
sys.path.append('sim_app')
from statlog_sim_app import StatlogSimApp
!pygmentize sim_app/sim_app_utils.py
!pygmentize sim_app/statlog_sim_app.py
ExperimentManager is the top level class for all the Bandits/RL and continual learning workflows. Similar to the estimators in the Sagemaker Python SDK, ExperimentManager contains methods for training, deployment and evaluation. It keeps track of the job status and reflects current progress in the workflow.
Start the application using the ExperimentManager class. Information about the class can be found here : /common/sagemaker_rl/orchestrator/workflow/manager/experiment_manager.py
Training
Inference
Data Aggregation
Logging
Clean-up
from orchestrator.workflow.manager.experiment_manager import ExperimentManager
The initialization below will set up an AWS CloudFormation stack of additional resources.
<div class="alert alert-block alert-danger""> IMPORTANT: Enter the name of your Experiment here :
experiment_name = "AIM404-sbs" #YOUR EXPERIMENT NAME HERE - can be AIM404-1
bandits_experiment = ExperimentManager(config, experiment_id=experiment_name)
To start a new experiment, we need to initialize the first model. In the case where historical data is available and is in the format of <state, action, action probability, reward>, we can warm start by learning the policy offline. Otherwise, we can initiate a random policy.
Warm start the policy
We showcase the warm start by generating a batch of randomly selected samples with size batch_size. Then we split it into a training set and an evaluation set using the parameter ratio.
From sim_app/sim_app_utils.py, we use a tool that simulate an application
display(Image('images/AIM404-model-initialization.png'))
from sim_app_utils import *
batch_size = 100
warm_start_data_buffer = prepare_statlog_warm_start_data(data_file='sim_app/shuttle.trn', batch_size=batch_size)
# upload to s3
bandits_experiment.ingest_joined_data(warm_start_data_buffer,ratio=0.8)
With prepare_statlog_warm_start_data() method, it randomly picks an action and generate the reward associated to the action picked!
It is composed of the following elements :
pp = pprint.PrettyPrinter(indent=4)
pp.pprint(bandits_experiment._jsonify())
bandits_experiment.initialize_first_model(input_data_s3_prefix=bandits_experiment.last_joined_job_train_data)
The Experiment manager relies on RLEstimator from Sagemaker SDK to do the training and deployment.
The custom code for training is located in train-vw.py located in src </div/>
In the experiment workflow, we have trained a specific model
pp.pprint(bandits_experiment._jsonify())
Evaluate current model against historical model
After every training cycle, we evaluate if the newly trained model is better than the one currently deployed. Using the evaluation dataset, we evaluate how the new model would perform compared to the model that is currently deployed. SageMaker RL supports offline evaluation by performing counterfactual analysis (CFA). By default, we apply doubly robust (DR) estimation method. The bandit policy tries to minimize the cost (1-reward) value in this case, so a smaller evaluation score indicates better policy performance.
display(Image('images/AIM404-model-initialization-evaluate.png'))
# evaluate the current model by launching a training job on Sagemaker
bandits_experiment.evaluate_model(
input_data_s3_prefix=bandits_experiment.last_joined_job_eval_data,
evaluate_model_id=bandits_experiment.last_trained_model_id)
The Experiment manager uses the same estimator for evalution and training ==> RLEstimator
The difference here is the custom code used as entry point for the container. It is now eval-cfa-vw.py in src
Once again, the evaluation of our model should be between 0 and 1. The smaller, the better </div/>
# now get the evaluation score
eval_score_last_trained_model = bandits_experiment.get_eval_score(
evaluate_model_id=bandits_experiment.last_trained_model_id,
eval_data_path=bandits_experiment.last_joined_job_eval_data
)
Download last joined data used for evaluation under statlog_warm_start.data file
download_historical_data_from_s3(data_s3_prefix=bandits_experiment.last_joined_job_eval_data)
Check what is inside
pd.read_csv("./statlog_warm_start.data", sep=',')
# get baseline performance from the historical (warm start) data (cost = 1-mean(reward))
baseline_score = evaluate_historical_data(data_file='statlog_warm_start.data')
baseline_score
Now we can compare the value from eval_score_last_trained_model with the baseline_score. Can you see an improvement? :)
# Check the model_id of the last model trained.
bandits_experiment.last_trained_model_id
pp.pprint(bandits_experiment._jsonify())
Once training and evaluation is done, we can deploy the model.
display(Image('images/AIM404-model-initialization-deploy.png'))
bandits_experiment.deploy_model(model_id=bandits_experiment.last_trained_model_id)
SageMaker hosting endpoint saves all the inferences <eventID, state, action, action probability> to S3 using Kinesis Firehose
This firehose stream is created during endopint deployment.
The Experiment manager creates the sagemaker endpoint by using the method Model from Sagemaker SDK. </div/>
You can check the experiment state at any point by executing:
pp.pprint(bandits_experiment._jsonify())
The model just trained appears in both last_trained_model_id and last_hosted_model_id.
Now that the last trained model is hosted, client application can send out the state, hit the endpoint, and receive the recommended action. There are 7 classes in the statlog data, corresponding to 7 actions respectively.
display(Image('images/AIM404-inference.png'))
Here we use the Predictor class from Sagemaker Python SDK. </div/>
predictor = bandits_experiment.predictor
sim_app = StatlogSimApp(predictor=predictor)
Make sure that num_arms specified in config.yaml is equal to the total unique actions in the simulation application.
assert sim_app.num_actions == bandits_experiment.config["algor"]["algorithms_parameters"]["num_arms"]
user_id, user_context = sim_app.choose_random_user()
action, event_id, model_id, action_prob, sample_prob = predictor.get_action(obs=user_context)
# Check prediction response by uncommenting the lines below
print('Selected action: {}, event ID: {}, model ID: {}, probability: {}'.format(action, event_id, model_id, action_prob))
Client application generates a reward after receiving the recommended action and stores the tuple <eventID, reward> in S3. In this case, reward is 1 if predicted action is the true class, and 0 otherwise. . The experiment manager joins the reward with state, action and action probability using Amazon Athena.
display(Image('images/AIM404-reward.png'))
local_mode = bandits_experiment.local_mode
batch_size = 500 # collect 500 data instances
print("Collecting batch of experience data...")
# Generate experiences and log them
for i in range(batch_size):
user_id, user_context = sim_app.choose_random_user()
action, event_id, model_id, action_prob, sample_prob = predictor.get_action(obs=user_context.tolist())
reward = sim_app.get_reward(user_id, action, event_id, model_id, action_prob, sample_prob, local_mode)
Once data from inference and reward phase are on S3, we need to start aggregate this data to the following format : <eventID, state, action, action probability, reward>.
This step is done with Amazon Athena </div/>
display(Image('images/AIM404-join.png'))
# Join (observation, action) with rewards (can be delayed) and upload the data to S3
if local_mode:
bandits_experiment.ingest_joined_data(sim_app.joined_data_buffer)
else:
print("Waiting for firehose to flush data to s3...")
time.sleep(60) # Wait for firehose to flush data to S3 - actually the VW container image take as a venv the firehose stream name
rewards_s3_prefix = bandits_experiment.ingest_rewards(sim_app.rewards_buffer)
print(rewards_s3_prefix)
bandits_experiment.join(rewards_s3_prefix)
sim_app.clear_buffer()
bandits_experiment.last_joined_job_train_data
# Check the workflow to see if join job has completed successfully
pp.pprint(bandits_experiment._jsonify())
Now we can train a new model with newly collected experiences, and host the resulting model.
display(Image('images/AIM404-model-retraining.png'))
bandits_experiment.train_next_model(input_data_s3_prefix=bandits_experiment.last_joined_job_train_data)
bandits_experiment.last_trained_model_id
Now we will update the model behind the same sagemaker endpoint. We can check how blue/green deployment works on Sagemaker by analysing the following elements :
- Sagemaker Endpoints
- Sagemaker Endpoint Configurations
- Sagemaker Models
To check no error is happening during the blue/green deployment phase, you will send inference requests to the endpoint through a loop.
<div class="alert alert-block alert-danger""> To do so, go to the following notebooks Inference Loop </div>
Now you can start the deployment of the newly trained model and follow that everything is ok through Cloudwatch
<div class="alert alert-block alert-danger"">
# deployment takes ~10 min if `local_mode` is False
bandits_experiment.deploy_model(model_id=bandits_experiment.last_trained_model_id)
bandits_experiment.last_hosted_model_id
<div class="alert alert-block alert-danger""> Go to the End-to-End notebook for the end of the workshop AIM404-End_2_end_loop </div>
We have three DynamoDB tables (experiment, join, model) from the bandits application above (e.g. experiment_id='bandits-exp-1'). To better maintain them, we should remove the related records if the experiment has finished. Besides, having an endpoint running will incur costs. Therefore, we delete these components as part of the clean up process.
Only execute the clean up cells below when you've finished the current experiment and want to deprecate everything associated with it. After the cleanup, the Cloudwatch metrics will not be populated anymore. </div/>
bandits_experiment.clean_resource(experiment_id=bandits_experiment.experiment_id)
bandits_experiment.clean_table_records(experiment_id=bandits_experiment.experiment_id)
1+2